1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.util;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.junit.Assert.assertTrue;
20  
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.junit.Test;
29  
30  import rx.Scheduler;
31  import rx.Subscription;
32  import rx.functions.Action0;
33  import rx.functions.Func1;
34  import rx.schedulers.Schedulers;
35  
36  public class IndexedRingBufferTest {
37  
38      @Test
39      public void add() {
40          IndexedRingBuffer<LSubscription> list = IndexedRingBuffer.getInstance();
41          list.add(new LSubscription(1));
42          list.add(new LSubscription(2));
43          final AtomicInteger c = new AtomicInteger();
44  
45          list.forEach(newCounterAction(c));
46          assertEquals(2, c.get());
47      }
48  
49      @Test
50      public void removeEnd() {
51          IndexedRingBuffer<LSubscription> list = IndexedRingBuffer.getInstance();
52          list.add(new LSubscription(1));
53          int n2 = list.add(new LSubscription(2));
54  
55          final AtomicInteger c = new AtomicInteger();
56          list.forEach(newCounterAction(c));
57          assertEquals(2, c.get());
58  
59          list.remove(n2);
60  
61          final AtomicInteger c2 = new AtomicInteger();
62          list.forEach(newCounterAction(c2));
63          assertEquals(1, c2.get());
64      }
65  
66      @Test
67      public void removeMiddle() {
68          IndexedRingBuffer<LSubscription> list = IndexedRingBuffer.getInstance();
69          list.add(new LSubscription(1));
70          int n2 = list.add(new LSubscription(2));
71          list.add(new LSubscription(3));
72  
73          list.remove(n2);
74  
75          final AtomicInteger c = new AtomicInteger();
76          list.forEach(newCounterAction(c));
77          assertEquals(2, c.get());
78      }
79  
80      @Test
81      public void addRemoveAdd() {
82          IndexedRingBuffer<String> list = IndexedRingBuffer.getInstance();
83          list.add("one");
84          list.add("two");
85          list.add("three");
86          ArrayList<String> values = new ArrayList<String>();
87          list.forEach(accumulate(values));
88          assertEquals(3, values.size());
89          assertEquals("one", values.get(0));
90          assertEquals("two", values.get(1));
91          assertEquals("three", values.get(2));
92  
93          list.remove(1);
94  
95          values.clear();
96          list.forEach(accumulate(values));
97          assertEquals(2, values.size());
98          assertEquals("one", values.get(0));
99          assertEquals("three", values.get(1));
100 
101         list.add("four");
102 
103         values.clear();
104         list.forEach(accumulate(values));
105         assertEquals(3, values.size());
106         assertEquals("one", values.get(0));
107         assertEquals("four", values.get(1));
108         assertEquals("three", values.get(2));
109 
110         final AtomicInteger c = new AtomicInteger();
111         list.forEach(newCounterAction(c));
112         assertEquals(3, c.get());
113     }
114 
115     @Test
116     public void addThousands() {
117         String s = "s";
118         IndexedRingBuffer<String> list = IndexedRingBuffer.getInstance();
119         for (int i = 0; i < 10000; i++) {
120             list.add(s);
121         }
122         AtomicInteger c = new AtomicInteger();
123         list.forEach(newCounterAction(c));
124         assertEquals(10000, c.get());
125 
126         list.remove(5000);
127         c.set(0);
128         list.forEach(newCounterAction(c));
129         assertEquals(9999, c.get());
130 
131         list.add("one");
132         list.add("two");
133         c.set(0);
134 
135         //        list.forEach(print());
136 
137         list.forEach(newCounterAction(c));
138         assertEquals(10001, c.get());
139     }
140 
141     @Test
142     public void testForEachWithIndex() {
143         IndexedRingBuffer<String> buffer = IndexedRingBuffer.getInstance();
144         buffer.add("zero");
145         buffer.add("one");
146         buffer.add("two");
147         buffer.add("three");
148 
149         final ArrayList<String> list = new ArrayList<String>();
150         int nextIndex = buffer.forEach(accumulate(list));
151         assertEquals(4, list.size());
152         assertEquals(list, Arrays.asList("zero", "one", "two", "three"));
153         assertEquals(0, nextIndex);
154 
155         list.clear();
156         nextIndex = buffer.forEach(accumulate(list), 0);
157         assertEquals(4, list.size());
158         assertEquals(list, Arrays.asList("zero", "one", "two", "three"));
159         assertEquals(0, nextIndex);
160 
161         list.clear();
162         nextIndex = buffer.forEach(accumulate(list), 2);
163         assertEquals(4, list.size());
164         assertEquals(list, Arrays.asList("two", "three", "zero", "one"));
165         assertEquals(2, nextIndex); // 2, 3, 0, 1
166 
167         list.clear();
168         nextIndex = buffer.forEach(accumulate(list), 3);
169         assertEquals(4, list.size());
170         assertEquals(list, Arrays.asList("three", "zero", "one", "two"));
171         assertEquals(3, nextIndex); // 3, 0, 1, 2
172 
173         list.clear();
174         nextIndex = buffer.forEach(new Func1<String, Boolean>() {
175 
176             @Override
177             public Boolean call(String t1) {
178                 list.add(t1);
179                 return false;
180             }
181 
182         }, 3);
183         assertEquals(1, list.size());
184         assertEquals(list, Arrays.asList("three"));
185         assertEquals(3, nextIndex); // we ended early so we'll go back to this index again next time
186 
187         list.clear();
188         nextIndex = buffer.forEach(new Func1<String, Boolean>() {
189             int i = 0;
190 
191             @Override
192             public Boolean call(String t1) {
193                 list.add(t1);
194                 if (i++ == 2) {
195                     return false;
196                 } else {
197                     return true;
198                 }
199             }
200 
201         }, 0);
202         assertEquals(3, list.size());
203         assertEquals(list, Arrays.asList("zero", "one", "two"));
204         assertEquals(2, nextIndex); // 0, 1, 2 (// we ended early so we'll go back to the last index again next time)
205     }
206 
207     @Test
208     public void testForEachAcrossSections() {
209         IndexedRingBuffer<Integer> buffer = IndexedRingBuffer.getInstance();
210         for (int i = 0; i < 10000; i++) {
211             buffer.add(i);
212         }
213 
214         final ArrayList<Integer> list = new ArrayList<Integer>();
215         int nextIndex = buffer.forEach(accumulate(list), 5000);
216         assertEquals(10000, list.size());
217         assertEquals(Integer.valueOf(5000), list.get(0));
218         assertEquals(Integer.valueOf(9999), list.get(4999));
219         assertEquals(Integer.valueOf(0), list.get(5000));
220         assertEquals(Integer.valueOf(4999), list.get(9999));
221         assertEquals(5000, nextIndex);
222     }
223 
224     @Test
225     public void longRunningAddRemoveAddDoesntLeakMemory() {
226         String s = "s";
227         IndexedRingBuffer<String> list = IndexedRingBuffer.getInstance();
228         for (int i = 0; i < 20000; i++) {
229             int index = list.add(s);
230             list.remove(index);
231         }
232 
233         AtomicInteger c = new AtomicInteger();
234         list.forEach(newCounterAction(c));
235         assertEquals(0, c.get());
236         //        System.out.println("Index is: " + list.index.get() + " when it should be no bigger than " + list.SIZE);
237         assertTrue(list.index.get() < IndexedRingBuffer.SIZE);
238         // it should actually be 1 since we only did add/remove sequentially
239         assertEquals(1, list.index.get());
240     }
241 
242     @Test
243     public void testConcurrentAdds() throws InterruptedException {
244         final IndexedRingBuffer<Integer> list = IndexedRingBuffer.getInstance();
245 
246         Scheduler.Worker w1 = Schedulers.computation().createWorker();
247         Scheduler.Worker w2 = Schedulers.computation().createWorker();
248 
249         final CountDownLatch latch = new CountDownLatch(2);
250 
251         w1.schedule(new Action0() {
252 
253             @Override
254             public void call() {
255                 for (int i = 0; i < 10000; i++) {
256                     list.add(i);
257                 }
258                 latch.countDown();
259             }
260 
261         });
262         w2.schedule(new Action0() {
263 
264             @Override
265             public void call() {
266                 for (int i = 10000; i < 20000; i++) {
267                     list.add(i);
268                 }
269                 latch.countDown();
270             }
271 
272         });
273 
274         latch.await();
275 
276         w1.unsubscribe();
277         w2.unsubscribe();
278 
279         AtomicInteger c = new AtomicInteger();
280         list.forEach(newCounterAction(c));
281         assertEquals(20000, c.get());
282 
283         ArrayList<Integer> values = new ArrayList<Integer>();
284         list.forEach(accumulate(values));
285         Collections.sort(values);
286         int j = 0;
287         for (int i : values) {
288             assertEquals(i, j++);
289         }
290     }
291 
292     @Test
293     public void testConcurrentAddAndRemoves() throws InterruptedException {
294         final IndexedRingBuffer<Integer> list = IndexedRingBuffer.getInstance();
295 
296         final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
297 
298         Scheduler.Worker w1 = Schedulers.computation().createWorker();
299         Scheduler.Worker w2 = Schedulers.computation().createWorker();
300 
301         final CountDownLatch latch = new CountDownLatch(2);
302 
303         w1.schedule(new Action0() {
304 
305             @Override
306             public void call() {
307                 try {
308                     for (int i = 10000; i < 20000; i++) {
309                         list.add(i);
310                         //                        Integer v = list.remove(index);
311                     }
312                 } catch (Exception e) {
313                     e.printStackTrace();
314                     exceptions.add(e);
315                 }
316                 latch.countDown();
317             }
318 
319         });
320 
321         w2.schedule(new Action0() {
322 
323             @Override
324             public void call() {
325                 try {
326                     for (int i = 0; i < 10000; i++) {
327                         int index = list.add(i);
328                         // cause some random remove/add interference
329                         Integer v = list.remove(index);
330                         if (v == null) {
331                             throw new RuntimeException("should not get null");
332                         }
333                         list.add(v);
334                     }
335                 } catch (Exception e) {
336                     e.printStackTrace();
337                     exceptions.add(e);
338                 }
339                 latch.countDown();
340             }
341 
342         });
343 
344         latch.await();
345 
346         w1.unsubscribe();
347         w2.unsubscribe();
348 
349         AtomicInteger c = new AtomicInteger();
350         list.forEach(newCounterAction(c));
351         assertEquals(20000, c.get());
352 
353         ArrayList<Integer> values = new ArrayList<Integer>();
354         list.forEach(accumulate(values));
355         Collections.sort(values);
356         int j = 0;
357         for (int i : values) {
358             assertEquals(i, j++);
359         }
360 
361         if (exceptions.size() > 0) {
362             System.out.println("Exceptions: " + exceptions);
363         }
364         assertEquals(0, exceptions.size());
365     }
366 
367     private <T> Func1<T, Boolean> accumulate(final ArrayList<T> list) {
368         return new Func1<T, Boolean>() {
369 
370             @Override
371             public Boolean call(T t1) {
372                 list.add(t1);
373                 return true;
374             }
375 
376         };
377     }
378 
379     @SuppressWarnings("unused")
380     private Func1<Object, Boolean> print() {
381         return new Func1<Object, Boolean>() {
382 
383             @Override
384             public Boolean call(Object t1) {
385                 System.out.println("Object: " + t1);
386                 return true;
387             }
388 
389         };
390     }
391 
392     private Func1<Object, Boolean> newCounterAction(final AtomicInteger c) {
393         return new Func1<Object, Boolean>() {
394 
395             @Override
396             public Boolean call(Object t1) {
397                 c.incrementAndGet();
398                 return true;
399             }
400 
401         };
402     }
403 
404     public static class LSubscription implements Subscription {
405 
406         private final int n;
407 
408         public LSubscription(int n) {
409             this.n = n;
410         }
411 
412         @Override
413         public void unsubscribe() {
414 
415         }
416 
417         @Override
418         public boolean isUnsubscribed() {
419             return false;
420         }
421 
422         @Override
423         public String toString() {
424             return "Subscription=>" + n;
425         }
426     }
427 }